package drds.plus.executor.cursor.cursor.impl.join;

import drds.plus.executor.cursor.cursor.IMergeSortJoinCursor;
import drds.plus.executor.cursor.cursor.ISortingCursor;
import drds.plus.executor.cursor.cursor_metadata.CursorMetaDataImpl;
import drds.plus.executor.row_values.ArrayRowValues;
import drds.plus.executor.row_values.RowValues;
import drds.plus.executor.row_values.row_value.RowValueScaners;
import drds.plus.executor.utils.Utils;
import drds.plus.sql_process.abstract_syntax_tree.configuration.ColumnMetaData;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.query.Join;
import drds.plus.sql_process.abstract_syntax_tree.expression.item.Item;
import drds.plus.sql_process.abstract_syntax_tree.expression.order_by.OrderBy;
import drds.plus.util.GeneralUtil;

import java.util.*;

/**
 * <pre>
 * sort sort join.
 * 实现 支持inner join，leftNode join，rightNode join，outter join
 * 要求两边均按照join列完全有序
 * <a>http://en.wikipedia.org/wiki/Sort-merge_join</a>
 *
 *
 *  function sortMerge(relation leftNode, relation rightNode, attribute a)
 *      var relation output
 *      var comparativeList left_sorted := sort(leftNode, a) // Relation leftNode sorted on attribute a
 *      var comparativeList right_sorted := sort(rightNode, a)
 *      var attribute leftRowData, rightSideRowData
 *      var setLimitValue left_subset, right_subset // These sets discarded except where join predicate is satisfied
 *      advance(left_subset, left_sorted, leftRowData, a)
 *      advance(right_subset, right_sorted, rightSideRowData, a)
 *      while not empty(left_subset) and not empty(right_subset)
 *          if leftRowData = rightSideRowData // JoinImpl predicate satisfied
 *              add cross product of left_subset and right_subset to output
 *              advance(left_subset, left_sorted, leftRowData, a)
 *              advance(right_subset, right_sorted, rightSideRowData, a)
 *          else if leftRowData < rightSideRowData
 *             advance(left_subset, left_sorted, leftRowData, a)
 *          else // leftRowData > rightSideRowData
 *             advance(right_subset, right_sorted, rightSideRowData, a)
 *      return output
 *
 *  // Remove tuples from sorted to subset until the sorted[1].a value changes
 *  function advance(subset out, sorted inout, columnName out, a in)
 *      columnName := sorted[1].a
 *      subset := emptySet
 *      while not empty(sorted) and sorted[1].a = columnName
 *          insert sorted[1] into subset
 *          remove sorted[1]
 * </pre>
 */

public class SortMergeJoinCursor extends JoinCursor implements IMergeSortJoinCursor {
    protected RowValues current;
    private Iterator<RowValues> rowDataIterator = null;
    private RowValues leftSideRowData;
    private RowValues rightSideRowData;
    private boolean needAdvanceLeft = true;
    private boolean needAdvanceRight = true;
    private List<RowValues> leftSideRowDataList;
    private List<RowValues> rightSideRowDataList;
    private List<List<OrderBy>> joinOrderByList;

    public SortMergeJoinCursor(ISortingCursor leftSideCursor, ISortingCursor rightSideCursor, List leftJoinItemList, List rightJoinItemList) throws RuntimeException {
        super(leftSideCursor, leftJoinItemList, rightSideCursor, rightJoinItemList);
        this.leftCursor = leftSideCursor;
        this.rightCursor = rightSideCursor;
        // 暂时以右表的顺序为准，因为目前选择sort sort join主要是针对outter右表存在排序字段
        // 后续需要优化orderbys信息，针对sort sort join，左右表的顺序字段都是正确的
        this.orderByList = rightSideCursor.getOrderByList();
        this.joinOrderByList = new ArrayList<List<OrderBy>>();
        joinOrderByList.add(leftSideCursor.getOrderByList());
        joinOrderByList.add(rightSideCursor.getOrderByList());
    }

    public SortMergeJoinCursor(Join join, ISortingCursor leftSideCursor, ISortingCursor rightSideCursor, List leftJoinOnColumns, List rightJoinOnColumns) throws RuntimeException {
        this(leftSideCursor, rightSideCursor, leftJoinOnColumns, rightJoinOnColumns);
        setLeftRightJoin(join);
    }

    public RowValues next() throws RuntimeException {
        if (rowDataIterator != null && rowDataIterator.hasNext()) {
            this.current = rowDataIterator.next();
            return this.current;
        }

        if (needAdvanceLeft) {
            this.leftSideRowDataList = new LinkedList<RowValues>();
            leftSideRowData = advance(leftSideRowDataList, leftCursor, leftJoinItemList);
        }

        if (needAdvanceRight) {
            this.rightSideRowDataList = new LinkedList<RowValues>();
            rightSideRowData = advance(rightSideRowDataList, rightCursor, rightJoinItemList);
        }

        while (!leftSideRowDataList.isEmpty() && !rightSideRowDataList.isEmpty()) {
            int compare = compare(leftSideRowData, leftJoinItemList, rightSideRowData, rightJoinItemList);
            if (compare == 0) {
                this.needAdvanceLeft = true;
                this.needAdvanceRight = true;
                List<RowValues> rowDataList = across(leftSideRowDataList, rightSideRowDataList);
                rowDataIterator = rowDataList.iterator();
                this.current = rowDataIterator.next();
                return this.current;
            } else {
                // outter join情况下，没有消耗就不需要前移
                if (this.isLeftOutJoin() || this.isRightOutJoin()) {
                    needAdvanceLeft = false;
                    needAdvanceRight = false;
                }
                if (compare < 0) {
                    if (this.isLeftOutJoin()) {//右为空
                        this.needAdvanceLeft = true;
                        List<RowValues> rowDataList = across(leftSideRowDataList, getNullRowDataList(rightCursor.getColumnMetaDataList()));
                        rowDataIterator = rowDataList.iterator();
                        this.current = rowDataIterator.next();
                        return this.current;
                    }
                    //其他两种情况因为left<right所以需要移动
                    leftSideRowData = advance(leftSideRowDataList, leftCursor, leftJoinItemList);
                } else {
                    if (this.isRightOutJoin()) {//左为空
                        this.needAdvanceRight = true;
                        List<RowValues> rowDataList = across(getNullRowDataList(leftCursor.getColumnMetaDataList()), rightSideRowDataList);
                        rowDataIterator = rowDataList.iterator();
                        this.current = rowDataIterator.next();
                        return this.current;
                    }
                    //其他两种情况因为left<right所以需要移动
                    rightSideRowData = advance(rightSideRowDataList, rightCursor, rightJoinItemList);
                }
            }
        }
        //其中一边为空 一边不为空
        // outter join情况下，要将两个cursor都取完
        if (!(leftSideRowDataList.isEmpty() && rightSideRowDataList.isEmpty())) {
            if (leftSideRowDataList.isEmpty() && this.isRightOutJoin()) {
                this.needAdvanceRight = true;
                List<RowValues> rowDataList = across(getNullRowDataList(leftCursor.getColumnMetaDataList()), rightSideRowDataList);
                rowDataIterator = rowDataList.iterator();
                this.current = rowDataIterator.next();
                return this.current;
            }
            if (rightSideRowDataList.isEmpty() && this.isLeftOutJoin()) {
                this.needAdvanceLeft = true;
                List<RowValues> rowDataList = across(leftSideRowDataList, getNullRowDataList(rightCursor.getColumnMetaDataList()));
                rowDataIterator = rowDataList.iterator();
                this.current = rowDataIterator.next();
                return this.current;
            }
        }

        current = null;
        return current;
    }

    private List<RowValues> getNullRowDataList(List<ColumnMetaData> columnMetaDataList) {
        List<RowValues> rowDataList = new ArrayList<RowValues>(1);
        List<Object> valueList = new ArrayList<Object>(columnMetaDataList.size());
        for (int i = 0; i < columnMetaDataList.size(); i++) {
            valueList.add(null);
        }
        ArrayRowValues arrayRowData = new ArrayRowValues(CursorMetaDataImpl.buildCursorMetaData(columnMetaDataList), valueList.toArray());
        rowDataList.add(arrayRowData);
        return rowDataList;
    }

    /**
     * 两边做笛卡尔积，返回
     */
    private List<RowValues> across(List<RowValues> leftRowDataList, List<RowValues> rightRowDataList) throws RuntimeException {
        List<RowValues> rowDataList = new ArrayList<RowValues>(leftRowDataList.size() * rightRowDataList.size());
        for (RowValues leftRowData : leftRowDataList) {
            for (RowValues rightRowData : rightRowDataList) {
                rowDataList.add(join(leftRowData, rightRowData));
            }
        }

        return rowDataList;

    }

    private int compare(RowValues leftSideRowData, List<Item> leftSideItemList, RowValues rightSideRowData, List<Item> rightSideItemList) {
        Comparator comparator = RowValueScaners.getComparator(leftSideRowData.getParentCursorMetaData(), leftSideItemList, rightSideRowData.getParentCursorMetaData(), rightSideItemList);
        return comparator.compare(leftSideRowData, rightSideRowData);
    }

    private RowValues advance(List<RowValues> rowDataList, ISortingCursor cursor, List columns) throws RuntimeException {
        rowDataList.clear();
        if (cursor.current() == null) {
            if (cursor.next() == null) {
                return null;
            }
        }

        // 这里的数据要固化下来，否则next之后数据就丢了
        RowValues rowData = Utils.fromRowDataToArrayRowData(cursor.current());
        rowDataList.add(rowData);
        while (cursor.next() != null && compare(rowData, columns, cursor.current(), columns) == 0) {
            rowDataList.add(Utils.fromRowDataToArrayRowData(cursor.current()));
        }
        return rowData;
    }

    public List<List<OrderBy>> getJoinOrderByListList() {
        return joinOrderByList;
    }

    public String toString() {
        return toStringWithInden(0);
    }

    public String toStringWithInden(int inden) {
        StringBuilder sb = new StringBuilder();
        String subQueryTab = GeneralUtil.getTab(inden);
        Utils.printCursorMetaData(cursorMetaData, inden, sb);
        Utils.printOrderByList(orderByList, inden, sb);
        sb.append(subQueryTab).append("【Sort MergeQueryImpl JoinImpl : ").append("\n");
        sb.append(subQueryTab).append("leftCursor:").append("\n");
        sb.append(leftCursor.toStringWithInden(inden + 1));
        sb.append(subQueryTab).append("rightCursor:").append("\n");
        sb.append(rightCursor.toStringWithInden(inden + 1));

        return sb.toString();
    }
}
